home *** CD-ROM | disk | FTP | other *** search
- # vim:set et sts=4 sw=4:
- #
- # ibus - The Input Bus
- #
- # Copyright (c) 2007-2008 Huang Peng <shawn.p.huang@gmail.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this program; if not, write to the
- # Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- # Boston, MA 02111-1307 USA
-
- __all__ = (
- "Bus",
- )
-
- import dbus
- import dbus.lowlevel
- import dbus.connection
- import dbus.mainloop.glib
- import gobject
- import common
- import object
- import serializable
- import config
-
- dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
-
- class Bus(object.Object):
- __gtype_name__ = "PYIBusBus"
- __gsignals__ = {
- "disconnected" : (
- gobject.SIGNAL_RUN_LAST,
- gobject.TYPE_NONE,
- ()
- ),
- "config-reloaded" : (
- gobject.SIGNAL_RUN_LAST,
- gobject.TYPE_NONE,
- ()
- ),
- }
-
- def __init__(self):
- super(Bus, self).__init__()
- self.__dbusconn = dbus.connection.Connection(common.get_address())
- self.__dbus = self.__dbusconn.get_object(dbus.BUS_DAEMON_NAME,
- dbus.BUS_DAEMON_PATH)
- self.__unique_name = self.hello()
- self.__ibus = self.__dbusconn.get_object(common.IBUS_SERVICE_IBUS,
- common.IBUS_PATH_IBUS)
-
- self.__dbusconn.call_on_disconnection(self.__dbusconn_disconnected_cb)
- # self.__dbusconn.add_message_filter(self.__filter_cb)
-
- def __filter_cb(self, conn, message):
- if message.get_type() == 4:
- print "Signal %s" % message.get_member()
- print " sender = %s" % message.get_sender()
- print " path = %s" % message.get_path()
- return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
-
- def __dbusconn_disconnected_cb(self, dbusconn):
- assert self.__dbusconn == dbusconn
- self.__dbusconn = None
- self.emit("disconnected")
-
- def get_name(self):
- return self.__unique_name
-
- def get_is_connected(self):
- if self.__dbusconn == None:
- return False
- return self.__dbusconn.get_is_connected()
-
- # define dbus methods
- def get_dbus(self):
- return self.__dbus
-
- def hello(self):
- return self.__dbus.Hello()
-
- def request_name(self, name, flags):
- return self.__dbus.RequestName(name, dbus.UInt32 (flags))
-
- def release_name(self, name):
- return self.__dbus.ReleaseName(name)
-
- def get_name_owner(self, name):
- return self.__dbus.GetNameOwner(name)
-
- def add_match(self, rule):
- return self.__dbus.AddMatch(rule)
-
- def remove_match(self, rule):
- return self.__dbus.RemoveMatch(rule)
-
- def get_dbusconn(self):
- return self.__dbusconn
-
- def get_address(self):
- return common.get_address()
-
- # define ibus methods
- def register_component(self, component):
- component = serializable.serialize_object(component)
- return self.__ibus.RegisterComponent(component)
-
- def list_engines(self):
- engines = self.__ibus.ListEngines()
- return map(serializable.deserialize_object, engines)
-
- def list_active_engines(self):
- engines = self.__ibus.ListActiveEngines()
- return map(serializable.deserialize_object, engines)
-
- def create_input_context(self, client_name):
- return self.__ibus.CreateInputContext(client_name)
-
- def exit(self, restart):
- return self.__ibus.Exit(restart)
-
- def ping(self, data):
- flag = isinstance(data, serializable.Serializable)
- if flag:
- data = serializable.serialize_object(data)
- data = self.__ibus.Ping(data, dbus_interface="org.freedesktop.IBus")
- if flag:
- data = serializable.deserialize_object(data)
- return data
-
- def introspect_ibus(self):
- return self.__ibus.Introspect()
-
- def introspect_dbus(self):
- return self.__dbus.Introspect()
-
- def get_config(self):
- try:
- return self.__config
- except:
- self.__config = config.Config(self)
- return self.__config
-
- def test():
- import glib
- import factory
- import text
-
- mainloop = glib.MainLoop()
-
- def __disconnected_cb(*args):
- print "Disconnected", args
- mainloop.quit()
-
- b = Bus()
- b.connect("disconnected", __disconnected_cb)
-
- print "unique_name =", b.get_name()
-
- for i in b.list_factories():
- print i.name
-
- mainloop.run()
- print "Exit"
-
-
- if __name__ == "__main__":
- test()
-